package drds.plus.datanode.api;

import drds.plus.datanode.configuration.DataSourceIndexAndNeedRetryIfDailed;
import drds.plus.datanode.configuration.DataSourceWrapper;
import drds.plus.datanode.select.Selector;
import drds.plus.datasource.api.ConnectionWrapper;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.*;

/**
 * <pre>
 * 相关的JDBC规范：
 * 1. Connection关闭，在其上打开的statement自动关闭。这就要求Connection持有其上打开的所有statement的引用
 * 2. 重试的场景
 *    1：在第一个statement上执行查询，路由到db1成功。再创建一个statement查询在db1上失败：
 *    stmt1 = Connection.createDataNodeStatement
 *    rs1 = stmt1.executeQuery
 *    --create connection on db1 and execute success
 *    stmt2 = conn..createDataNodeStatement
 *    rs2 = stmt2..executeQuery
 *    --db1 failed then...
 *    这时如果重试到db2库，db1的connection要不要关？
 *    a：如果关，其上的实际stmt和rs就都会关掉。这样db2成功后用户会看不到exception，对用户来说，stms1和rs1都是正常的。但实际上已经是坏掉的了。
 *    b: 如果不关，也就是TGroupConnection持有多个baseConnection，。。。
 *
 *    由以上场景的考虑，提炼出一个重试的原则：
 *    a. 一个TGroupConnection中，只在第一次与真正与数据库交互时，也就是不得不返回db结果给用户时，才在DBGroup上进行重试。
 *    b. 一旦在某个库上重试成功，后续在这个TGroupConnection上执行的所有操作，都只到这个库上，不再重试，出错直接抛出异常。
 *    c. 第一次建立真正连接的重试过程中，baseConnection有可能会发生变化被替换。一旦重试成功，baseConnection则保持不再改变。
 *
 * 这样可以简化很多事情，但同时不会对功能造成本质影响。同时避免了对状态处理不当，可能会给用户造成的诡异现象。
 * </pre>
 */
//@Data
@Slf4j
public class Connection {

    public static final DataSourceIndexAndNeedRetryIfDailed DATASOURCE_INDEX = new DataSourceIndexAndNeedRetryIfDailed(Selector.NOT_EXIST_USER_SPECIFIED_INDEX, false);
    @Setter
    @Getter
    public ConnectionWrapper writeConnectionWrapper;
    @Setter
    @Getter
    private DatasourceManager datasourceManager;

    @Setter
    @Getter
    private ConnectionWrapper readConnectionWrapper;
    @Setter
    @Getter
    private DataSourceWrapper readDataSourceWrapper;
    @Setter
    @Getter
    private DataSourceWrapper writeDataSourceWrapper;
    @Setter
    @Getter
    private Set<Statement> statementSet = Collections.synchronizedSet(new HashSet<Statement>(2));

    @Setter
    @Getter
    private boolean closed;
    @Setter
    @Getter
    private boolean autoCommit = true; // jdbc规范，新连接为true

    public Connection(DatasourceManager datasourceManager) {
        this.datasourceManager = datasourceManager;
    }

    /**
     * 获取事务中的上一个操作的链接
     */
    ConnectionWrapper getConnectionWrapper(String sql, boolean isRead) throws SQLException {
        DataSourceIndexAndNeedRetryIfDailed dataSourceIndexAndNeedRetryIfDailed = DATASOURCE_INDEX;
        if (sql == null) {
            // 如果当前的数据源索引与上一次的数据源索引不一样，说明上一次缓存的Connection已经无用了，需要关闭后重建。
            dataSourceIndexAndNeedRetryIfDailed = ThreadLocalDataSourceIndex.getIndex();
        } else {
            dataSourceIndexAndNeedRetryIfDailed = DataNodeHintParser.convertHint2Index(sql);
            if (dataSourceIndexAndNeedRetryIfDailed == null) {
                dataSourceIndexAndNeedRetryIfDailed = ThreadLocalDataSourceIndex.getIndex();
            }
        }

        // 代表出现自定义index请求
        if (dataSourceIndexAndNeedRetryIfDailed.dataSourceIndex != Selector.NOT_EXIST_USER_SPECIFIED_INDEX) {
            // 在事务状态下，设置不同的数据源索引会导致异常。
            if (!autoCommit) {
                if (writeDataSourceWrapper != null && !writeDataSourceWrapper.isMatchDataSourceIndex(dataSourceIndexAndNeedRetryIfDailed.dataSourceIndex)) {
                    throw new SQLException("Transaction in another dataSourceIndexAndNeedRetryIfDailed: " + dataSourceIndexAndNeedRetryIfDailed);
                }
            }
            if (isRead) {
                if (readDataSourceWrapper != null && !readDataSourceWrapper.isMatchDataSourceIndex(dataSourceIndexAndNeedRetryIfDailed.dataSourceIndex)) {
                    closeReadConnection();
                }
            } else {
                if (writeDataSourceWrapper != null && !writeDataSourceWrapper.isMatchDataSourceIndex(dataSourceIndexAndNeedRetryIfDailed.dataSourceIndex)) {
                    closeWriteConnection();
                }
            }
        }

        // 为了保证事务正确关闭，在事务状态下只会取回写连接
        if (isRead && autoCommit) {
            // 只要有写连接，并且对应的库可读，则复用。否则返回读连接
            return writeConnectionWrapper != null && writeDataSourceWrapper.hasReadWeight() ? writeConnectionWrapper : readConnectionWrapper;
            // 先写后读，重用写连接读后，rBaseConnection仍然是null
        } else {
            if (writeConnectionWrapper != null) {
                return writeConnectionWrapper;
            } else if (readConnectionWrapper != null && readDataSourceWrapper.hasWriteWeight()) {
                // 在写连接null的情况下，如果读连接已经建立，且对应的库可写，则复用
                writeConnectionWrapper = readConnectionWrapper; // wBaseConnection赋值，以确保事务能够正确提交回滚
                writeDataSourceWrapper = readDataSourceWrapper;
                // 在写连接上设置事务
                if (writeConnectionWrapper.getAutoCommit() != autoCommit) {
                    writeConnectionWrapper.setAutoCommit(autoCommit);
                }
                return writeConnectionWrapper;
            } else {
                return null;
            }
        }
    }

    /**
     * 从实际的DataSource获得一个下层（有可能不是真实的）Session
     * 包权限：此方法只在TGroupStatement、TGroupPreparedStatement中使用
     */
    ConnectionWrapper createConnectionWrapper(DataSourceWrapper dataSourceWrapper, boolean isRead) throws SQLException {
        // 这个方法只发生在第一次建立读/写连接的时候，以后都是复用了
        ConnectionWrapper connectionWrapper = dataSourceWrapper.getDataSource().getDataSourceWrapper().getConnectionWrapper();

        // 为了保证事务正确关闭，在事务状态下只设置写连接
        setConnectionWrapper(dataSourceWrapper, connectionWrapper, isRead && autoCommit);
        // 只在写连接上调用 setAutoCommit, 与 Connection#setAutoCommit 的代码保持一致
        if (!isRead || !autoCommit) {
            connectionWrapper.setAutoCommit(autoCommit); // 新建连接的AutoCommit要与当前isAutoCommit的状态同步
        }
        return connectionWrapper;
    }

    private void setConnectionWrapper(DataSourceWrapper dataSourceWrapper, ConnectionWrapper connectionWrapper, boolean isRead) {
        if (connectionWrapper == null) {
            log.warn("setConnectionWrapper to null !!");
        }
        if (isRead) {
            closeReadConnection();
        } else {
            closeWriteConnection();
        }
        if (isRead) {
            readConnectionWrapper = connectionWrapper;
            this.readDataSourceWrapper = dataSourceWrapper;
        } else {
            writeConnectionWrapper = connectionWrapper;
            this.writeDataSourceWrapper = dataSourceWrapper;
        }
    }

    private void closeReadConnection() {
        // readWeight|wBaseConnection可能指向同一个对象，如果另一个引用在用，就不去关闭
        if (readConnectionWrapper != null && readConnectionWrapper != writeConnectionWrapper) {
            try {
                readConnectionWrapper.close(); // 旧的baseConnection要关闭
            } catch (SQLException e) {
                log.error("close readConnectionWrapper failed.", e);
            }
            readDataSourceWrapper = null;
            readConnectionWrapper = null;
        }
    }

    private void closeWriteConnection() {
        // readWeight|wBaseConnection可能指向同一个对象，如果另一个引用在用，就不去关闭
        if (writeConnectionWrapper != null && readConnectionWrapper != writeConnectionWrapper) {
            try {
                writeConnectionWrapper.close(); // 旧的baseConnection要关闭
            } catch (SQLException e) {
                log.error("close writeConnectionWrapper failed.", e);
            }
            writeDataSourceWrapper = null;
            writeConnectionWrapper = null;
        }
    }

    void removeOpenedStatements(Statement statement) {
        if (!statementSet.remove(statement)) {
            log.warn("current statmenet ：" + statement + " doesn't exist!");
        }
    }

    private void checkClosed() throws SQLException {
        if (closed) {
            throw new SQLException("No operations allowed execute_plan_optimizer connection closed.");
        }
    }

    public boolean isClosed() throws SQLException {
        return closed;
    }

    public void close() throws SQLException {
        if (closed) {
            return;
        }
        closed = true;
        List<SQLException> sqlExceptionList = new LinkedList<SQLException>();
        try {
            // 关闭statement
            for (Statement statement : statementSet) {
                try {
                    statement.close(false);
                } catch (SQLException e) {
                    sqlExceptionList.add(e);
                }
            }
            try {
                if (readConnectionWrapper != null && !readConnectionWrapper.isClosed()) {
                    readConnectionWrapper.close();
                }
            } catch (SQLException e) {
                sqlExceptionList.add(e);
            }
            try {
                if (writeConnectionWrapper != null && !writeConnectionWrapper.isClosed()) {
                    writeConnectionWrapper.close();
                }
            } catch (SQLException e) {
                sqlExceptionList.add(e);
            }
        } finally {
            statementSet.clear();
            readConnectionWrapper = null;
            writeConnectionWrapper = null;
            ThreadLocalDataSourceIndex.clearIndex();
        }
        throw new SQLException("close tconnection");
    }

    public Statement createDataNodeStatement() throws SQLException {
        checkClosed();
        Statement statement = new Statement(this.datasourceManager.getApplicationId(), datasourceManager, this);
        statementSet.add(statement);
        return statement;
    }

    public PreparedStatement prepareDataNodePreparedStatement(String sql) throws SQLException {
        checkClosed();
        PreparedStatement dataNodePreparedStatement = new PreparedStatement(datasourceManager, this, sql, this.datasourceManager.getApplicationId());
        statementSet.add(dataNodePreparedStatement);
        return dataNodePreparedStatement;
    }

    public PreparedStatement prepareDataNodePreparedStatement(String sql, int autoGeneratedKeys) throws SQLException {
        PreparedStatement dataNodePreparedStatement = prepareDataNodePreparedStatement(sql);
        dataNodePreparedStatement.setAutoGeneratedKeys(autoGeneratedKeys);
        return dataNodePreparedStatement;
    }

    public boolean getAutoCommit() throws SQLException {
        checkClosed();
        return autoCommit;
    }

    public void setAutoCommit(boolean autoCommit) throws SQLException {
        checkClosed();
        if (this.autoCommit == autoCommit) {
            // 先排除两种最常见的状态,true==true 和false == false: 什么也不做
            return;
        }
        this.autoCommit = autoCommit;
        if (this.writeConnectionWrapper != null) {
            this.writeConnectionWrapper.setAutoCommit(autoCommit);
        }
    }

    public void commit() throws SQLException {
        checkClosed();
        if (autoCommit) {
            return;
        }
        if (writeConnectionWrapper != null) {
            try {
                writeConnectionWrapper.commit();
            } catch (SQLException e) {
                log.error("Commit failed on " + this.writeDataSourceWrapper.getDataSourceId() + ":" + e.getMessage());
                throw e;
            }
        }
    }

    public void rollback() throws SQLException {
        checkClosed();
        if (autoCommit) {
            return;
        }
        if (writeConnectionWrapper != null) {
            try {
                writeConnectionWrapper.rollback();
            } catch (SQLException e) {
                log.error("Rollback failed on " + this.writeDataSourceWrapper.getDataSourceId() + ":" + e.getMessage());
                throw e;
            }
        }
    }

    public void cancelQuery() throws SQLException {
        if (closed) {
            return;
        }
        List<SQLException> sqlExceptionList = new LinkedList<SQLException>();
        try {
            // 关闭statement
            for (Statement statement : statementSet) {
                try {
                    statement.cancel();
                } catch (SQLException e) {
                    sqlExceptionList.add(e);
                }
            }

        } finally {
            statementSet.clear();
        }
        throw new SQLException("close tconnection");
    }

    public void kill() throws SQLException {
        if (closed) {
            return;
        }
        List<SQLException> sqlExceptionList = new LinkedList<SQLException>();
        try {
            // cancel调现在的所有查询
            try {
                this.cancelQuery();
            } catch (SQLException e) {
                sqlExceptionList.add(e);
            }
        } finally {
            try {
                this.close();
            } catch (SQLException e) {
                sqlExceptionList.add(e);
            }
        }
        throw new SQLException("close tconnection");
    }


}
